/*-------------------------------------------------------------------------
 *
 * recursive_planning.c
 *
 * Logic for calling the postgres planner recursively for CTEs and
 * non-pushdownable subqueries in distributed queries.
 *
 * PostgreSQL with Citus can execute 4 types of queries:
 *
 * - Postgres queries on local tables and functions.
 *
 *   These queries can use all SQL features, but they may not reference
 *   distributed tables.
 *
 * - Router queries that can be executed on a single by node by replacing
 *   table names with shard names.
 *
 *   These queries can use nearly all SQL features, but only if they have
 *   a single-valued filter on the distribution column.
 *
 * - Multi-shard queries that can be executed by performing a task for each
 *   shard in a distributed table and performing a merge step.
 *
 *   These queries have limited SQL support. They may only include
 *   subqueries if the subquery can be executed on each shard by replacing
 *   table names with shard names and concatenating the result.
 *
 *   These queries have very limited SQL support and only support basic
 *   inner joins and subqueries without joins.
 *
 * To work around the limitations of these planners, we recursively call
 * the planner for CTEs and unsupported subqueries to obtain a list of
 * subplans.
 *
 * During execution, each subplan is executed separately through the method
 * that is appropriate for that query. The results are written to temporary
 * files on the workers. In the original query, the CTEs and subqueries are
 * replaced by mini-subqueries that read from the temporary files.
 *
 * This allows almost all SQL to be directly or indirectly supported,
 * because if all subqueries that contain distributed tables have been
 * replaced then what remains is a router query which can use nearly all
 * SQL features.
 *
 * Copyright (c) Citus Data, Inc.
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "funcapi.h"

#include "catalog/pg_class.h"
#include "catalog/pg_type.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/relation.h"
#include "nodes/pg_list.h"
#include "nodes/primnodes.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "optimizer/var.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"

#include "pg_version_constants.h"

#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/listutils.h"
#include "distributed/local_distributed_join_planner.h"
#include "distributed/log_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/query_colocation_checker.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_restriction_equivalence.h"
#include "distributed/shard_pruning.h"
#include "distributed/session_ctx.h"
#include "distributed/version_compat.h"

#include "type_cast.h"
/*
 * RecursivePlanningContext is used to recursively plan subqueries
 * and CTEs, pull results to the coordinator, and push it back into
 * the workers.
 */
struct RecursivePlanningContextInternal {
    int level;
    uint64 planId;
    bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */
    List* subPlanList;
    PlannerRestrictionContext* plannerRestrictionContext;
};

/*
 * CteReferenceWalkerContext is used to collect CTE references in
 * CteReferenceListWalker.
 */
typedef struct CteReferenceWalkerContext {
    int level;
    List* cteReferenceList;
} CteReferenceWalkerContext;

/*
 * VarLevelsUpWalkerContext is used to find Vars in a (sub)query that
 * refer to upper levels and therefore cannot be planned separately.
 */
typedef struct VarLevelsUpWalkerContext {
    int level;
} VarLevelsUpWalkerContext;

/* local function forward declarations */
static DeferredErrorMessage* RecursivelyPlanSubqueriesAndCTEs(
    Query* query, RecursivePlanningContext* context);
static bool ShouldRecursivelyPlanNonColocatedSubqueries(
    Query* subquery, RecursivePlanningContext* context);
static bool ContainsSubquery(Query* query);
static bool ShouldRecursivelyPlanOuterJoins(RecursivePlanningContext* context);
static void RecursivelyPlanNonColocatedSubqueries(Query* subquery,
                                                  RecursivePlanningContext* context);
static void RecursivelyPlanNonColocatedJoinWalker(
    Node* joinNode, ColocatedJoinChecker* colocatedJoinChecker,
    RecursivePlanningContext* recursivePlanningContext);
static void RecursivelyPlanNonColocatedSubqueriesInWhere(
    Query* query, ColocatedJoinChecker* colocatedJoinChecker,
    RecursivePlanningContext* recursivePlanningContext);
static bool RecursivelyPlanRecurringTupleOuterJoinWalker(
    Node* node, Query* query, RecursivePlanningContext* context);
static void RecursivelyPlanDistributedJoinNode(Node* node, Query* query,
                                               RecursivePlanningContext* context);
static bool IsRTERefRecurring(RangeTblRef* rangeTableRef, Query* query);
static List* SublinkListFromWhere(Query* originalQuery);
static bool ExtractSublinkWalker(Node* node, List** sublinkList);
static bool ShouldRecursivelyPlanSublinks(Query* query);
static bool RecursivelyPlanAllSubqueries(Node* node,
                                         RecursivePlanningContext* planningContext);
static DeferredErrorMessage* RecursivelyPlanCTEs(Query* query,
                                                 RecursivePlanningContext* context);
static bool RecursivelyPlanSubqueryWalker(Node* node, RecursivePlanningContext* context);
static bool ShouldRecursivelyPlanSubquery(Query* subquery,
                                          RecursivePlanningContext* context);
static bool AllDistributionKeysInSubqueryAreEqual(
    Query* subquery, PlannerRestrictionContext* restrictionContext);
static bool ShouldRecursivelyPlanSetOperation(Query* query,
                                              RecursivePlanningContext* context);
static bool RecursivelyPlanSubquery(Query* subquery,
                                    RecursivePlanningContext* planningContext);
static void RecursivelyPlanSetOperations(Query* query, Node* node,
                                         RecursivePlanningContext* context);
static bool IsLocalTableRteOrMatView(Node* node);
static DistributedSubPlan* CreateDistributedSubPlan(uint32 subPlanId,
                                                    Query* subPlanQuery);
static bool CteReferenceListWalker(Node* node, CteReferenceWalkerContext* context);
static bool ContainsReferencesToOuterQueryWalker(Node* node,
                                                 VarLevelsUpWalkerContext* context);
static bool NodeContainsSubqueryReferencingOuterQuery(Node* node);
static void WrapFunctionsInSubqueries(Query* query);
static void TransformFunctionRTE(RangeTblEntry* rangeTblEntry);
static bool ShouldTransformRTE(RangeTblEntry* rangeTableEntry);
static Query* BuildReadIntermediateResultsQuery(List* targetEntryList,
                                                List* columnAliasList,
                                                Const* resultIdConst, Oid functionOid,
                                                bool useBinaryCopyFormat);
static Query* CreateOuterSubquery(RangeTblEntry* rangeTableEntry,
                                  List* outerSubqueryTargetList);
static List* GenerateRequiredColNamesFromTargetList(List* targetList);
static char* GetRelationNameAndAliasName(RangeTblEntry* rangeTablentry);
static TupleDesc get_expr_result_tupdesc(Node* expr, bool noError);
/*
 * get_expr_result_tupdesc
 *		Get a tupdesc describing the result of a composite-valued expression
 *
 * If expression is not composite or rowtype can't be determined, returns NULL
 * if noError is true, else throws error.
 *
 * This is a simpler version of get_expr_result_type() for use when the caller
 * is only interested in determinate rowtype results.  As with that function,
 * beware of using this on the funcexpr of a RTE that has a coldeflist.
 */
TupleDesc get_expr_result_tupdesc(Node* expr, bool noError)
{
    TupleDesc tupleDesc;
    TypeFuncClass functypclass;

    functypclass = get_expr_result_type(expr, NULL, &tupleDesc);

    if (functypclass == TYPEFUNC_COMPOSITE)
        return tupleDesc;

    if (!noError) {
        Oid exprTypeId = exprType(expr);

        if (exprTypeId != RECORDOID)
            ereport(ERROR,
                    (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                     errmsg("type %s is not composite", format_type_be(exprTypeId))));
        else
            ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                            errmsg("record type has not been registered")));
    }

    return NULL;
}
/*
 * GenerateSubplansForSubqueriesAndCTEs is a wrapper around
 * RecursivelyPlanSubqueriesAndCTEs. The function returns the subplans if necessary. For
 * the details of when/how subplans are generated, see RecursivelyPlanSubqueriesAndCTEs().
 *
 * Note that the input originalQuery query is modified if any subplans are generated.
 */
List* GenerateSubplansForSubqueriesAndCTEs(
    uint64 planId, Query* originalQuery,
    PlannerRestrictionContext* plannerRestrictionContext)
{
    RecursivePlanningContext context;

    Session_ctx::PlanCtx().recursivePlanningDepth++;

    /*
     * Plan subqueries and CTEs that cannot be pushed down by recursively
     * calling the planner and add the resulting plans to subPlanList.
     */
    context.level = 0;
    context.planId = planId;
    context.subPlanList = NIL;
    context.plannerRestrictionContext = plannerRestrictionContext;

    /*
     * Calculating the distribution key equality upfront is a trade-off for us.
     *
     * When the originalQuery contains the distribution key equality, we'd be
     * able to skip further checks for each lower level subqueries (i.e., if the
     * all query contains distribution key equality, each subquery also contains
     * distribution key equality.)
     *
     * When the originalQuery doesn't contain the distribution key equality,
     * calculating this wouldn't help us at all, we should individually check
     * each each subquery and subquery joins among subqueries.
     */
    context.allDistributionKeysInQueryAreEqual =
        AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext);

    DeferredErrorMessage* error =
        RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context);
    if (error != NULL) {
        Session_ctx::PlanCtx().recursivePlanningDepth--;
        RaiseDeferredError(error, ERROR);
    }

    if (context.subPlanList && IsLoggableLevel(DEBUG1)) {
        StringInfo subPlanString = makeStringInfo();
        pg_get_query_def(originalQuery, subPlanString);
        ereport(DEBUG1, (errmsg("Plan " UINT64_FORMAT
                                " query after replacing subqueries and CTEs: %s",
                                planId, subPlanString->data)));
    }

    Session_ctx::PlanCtx().recursivePlanningDepth--;

    return context.subPlanList;
}

/*
 * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down
 * to workers directly and instead plans them by recursively calling the planner and
 * adding the subplan to subPlanList.
 *
 * Subplans are executed prior to the distributed plan and the results are written
 * to temporary files on workers.
 *
 * CTE references are replaced by a subquery on the read_intermediate_result
 * function, which reads from the temporary file.
 *
 * If recursive planning results in an error then the error is returned. Otherwise, the
 * subplans will be added to subPlanList.
 */
static DeferredErrorMessage* RecursivelyPlanSubqueriesAndCTEs(
    Query* query, RecursivePlanningContext* context)
{
    DeferredErrorMessage* error = RecursivelyPlanCTEs(query, context);
    if (error != NULL) {
        return error;
    }

    if (Session_ctx::Vars().SubqueryPushdown) {
        /*
         * When the subquery_pushdown flag is enabled we make some hacks
         * to push down subqueries with LIMIT. Recursive planning would
         * valiantly do the right thing and try to recursively plan the
         * inner subqueries, but we don't really want it to because those
         * subqueries might not be supported and would be much slower.
         *
         * Instead, we skip recursive planning altogether when
         * subquery_pushdown is enabled.
         */
        return NULL;
    }

    /* make sure function calls in joins are executed in the coordinator */
    WrapFunctionsInSubqueries(query);

    /* descend into subqueries */
    query_tree_walker(query, walker_cast0(RecursivelyPlanSubqueryWalker), context, 0);

    /*
     * At this point, all CTEs, leaf subqueries containing local tables and
     * non-pushdownable subqueries have been replaced. We now check for
     * combinations of subqueries that cannot be pushed down (e.g.
     * <subquery on reference table> UNION <subquery on distributed table>).
     *
     * This code also runs for the top-level query, which allows us to support
     * top-level set operations.
     */

    if (ShouldRecursivelyPlanSetOperation(query, context)) {
        RecursivelyPlanSetOperations(query, (Node*)query->setOperations, context);
    }

    if (query->havingQual != NULL) {
        if (NodeContainsSubqueryReferencingOuterQuery(query->havingQual)) {
            return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
                                 "Subqueries in HAVING cannot refer to outer query", NULL,
                                 NULL);
        }

        RecursivelyPlanAllSubqueries(query->havingQual, context);
    }

    /*
     * If the query doesn't have distribution key equality,
     * recursively plan some of its subqueries.
     */
    if (ShouldRecursivelyPlanNonColocatedSubqueries(query, context)) {
        RecursivelyPlanNonColocatedSubqueries(query, context);
    }

    if (ShouldConvertLocalTableJoinsToSubqueries(query->rtable)) {
        /*
         * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or
         * a query with local table/citus local table and subquery. We convert local/citus
         * local tables to a subquery until they can be planned.
         */
        RecursivelyPlanLocalTableJoins(query, context);
    }

    /*
     * Similarly, logical planner cannot handle outer joins when the outer rel
     * is recurring, such as "<recurring> LEFT JOIN <distributed>". In that case,
     * we convert distributed table into a subquery and recursively plan inner
     * side of the outer join. That way, inner rel gets converted into an intermediate
     * result and logical planner can handle the new query since it's of the from
     * "<recurring> LEFT JOIN <recurring>".
     */
    if (ShouldRecursivelyPlanOuterJoins(context)) {
        RecursivelyPlanRecurringTupleOuterJoinWalker((Node*)query->jointree, query,
                                                     context);
    }

    /*
     * If the FROM clause is recurring (does not contain a distributed table),
     * then we cannot have any distributed tables appearing in subqueries in
     * the SELECT and WHERE clauses.
     *
     * We do the sublink conversations at the end of the recursive planning
     * because earlier steps might have transformed the query into a
     * shape that needs recursively planning the sublinks.
     */
    if (ShouldRecursivelyPlanSublinks(query)) {
        /* replace all subqueries in the WHERE clause */
        if (query->jointree && query->jointree->quals) {
            RecursivelyPlanAllSubqueries((Node*)query->jointree->quals, context);
        }

        /* replace all subqueries in the SELECT clause */
        RecursivelyPlanAllSubqueries((Node*)query->targetList, context);
    }

    return NULL;
}

/*
 * GetPlannerRestrictionContext returns the planner restriction context
 * from the given context.
 */
PlannerRestrictionContext* GetPlannerRestrictionContext(
    RecursivePlanningContext* recursivePlanningContext)
{
    return recursivePlanningContext->plannerRestrictionContext;
}

/*
 * ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains
 * joins that are not on the distribution key.
 * *
 * Note that at the point that this function is called, we've already recursively planned
 * all the leaf subqueries. Thus, we're actually checking whether the joins among the
 * subqueries on the distribution key or not.
 */
static bool ShouldRecursivelyPlanNonColocatedSubqueries(Query* subquery,
                                                        RecursivePlanningContext* context)
{
    /*
     * If the input query already contains the equality, simply return since it is not
     * possible to find any non colocated subqueries.
     */
    if (context->allDistributionKeysInQueryAreEqual) {
        return false;
    }

    /*
     * This check helps us in two ways:
     *   (i) We're not targeting queries that don't include subqueries at all,
     *       they should go through regular planning.
     *  (ii) Lower level subqueries are already recursively planned, so we should
     *       only bother non-colocated subquery joins, which only happens when
     *       there are subqueries.
     */
    if (!ContainsSubquery(subquery)) {
        return false;
    }

    /* direct joins with local tables are not supported by any of Citus planners */
    if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
                                                      IsLocalTableRteOrMatView)) {
        return false;
    }

    /*
     * Finally, check whether this subquery contains distribution key equality or not.
     */
    if (!AllDistributionKeysInSubqueryAreEqual(subquery,
                                               context->plannerRestrictionContext)) {
        return true;
    }

    return false;
}

/*
 * ContainsSubquery returns true if the input query contains any subqueries
 * in the FROM or WHERE clauses.
 */
static bool ContainsSubquery(Query* query)
{
    return JoinTreeContainsSubquery(query) || WhereOrHavingClauseContainsSubquery(query);
}

/*
 * ShouldRecursivelyPlanOuterJoins returns true if the JoinRestrictionContext
 * that given RecursivePlanningContext holds implies that the query has outer
 * join(s) that might need to be recursively planned.
 */
static bool ShouldRecursivelyPlanOuterJoins(RecursivePlanningContext* context)
{
    if (!context || !context->plannerRestrictionContext ||
        !context->plannerRestrictionContext->joinRestrictionContext) {
        ereport(ERROR, (errmsg("unexpectedly got NULL pointer in recursive "
                               "planning context")));
    }

    return context->plannerRestrictionContext->joinRestrictionContext->hasOuterJoin;
}

/*
 * RecursivelyPlanNonColocatedSubqueries gets a query which includes one or more
 * other subqueries that are not joined on their distribution keys. The function
 * tries to recursively plan some of the subqueries to make the input query
 * executable by Citus.
 *
 * The function picks an anchor subquery and iterates on the remaining subqueries.
 * Whenever it finds a non colocated subquery with the anchor subquery, the function
 * decides to recursively plan the non colocated subquery.
 *
 * The function first handles subqueries in FROM clause (i.e., jointree->fromlist) and
 * then subqueries in WHERE clause (i.e., jointree->quals).
 *
 * The function does not treat outer joins seperately. Thus, we might end up with
 * a query where the function decides to recursively plan an outer side of an outer
 * join (i.e., LEFT side of LEFT JOIN). For simplicity, we chose to do so and handle
 * outer joins with a seperate pass on the join tree.
 */
static void RecursivelyPlanNonColocatedSubqueries(Query* subquery,
                                                  RecursivePlanningContext* context)
{
    FromExpr* joinTree = subquery->jointree;

    /* create the context for the non colocated subquery planning */
    PlannerRestrictionContext* restrictionContext = context->plannerRestrictionContext;
    ColocatedJoinChecker colocatedJoinChecker =
        CreateColocatedJoinChecker(subquery, restrictionContext);

    /*
     * Although this is a rare case, we weren't able to pick an anchor
     * range table entry, so we cannot continue.
     */
    if (colocatedJoinChecker.anchorRelationRestrictionList == NIL) {
        return;
    }

    /* handle from clause subqueries first */
    RecursivelyPlanNonColocatedJoinWalker((Node*)joinTree, &colocatedJoinChecker,
                                          context);

    /* handle subqueries in WHERE clause */
    RecursivelyPlanNonColocatedSubqueriesInWhere(subquery, &colocatedJoinChecker,
                                                 context);
}

/*
 * RecursivelyPlanNonColocatedJoinWalker gets a join node and walks over it to find
 * subqueries that live under the node.
 *
 * When a subquery found, it's checked whether the subquery is colocated with the
 * anchor subquery specified in the nonColocatedJoinContext. If not,
 * the subquery is recursively planned.
 */
static void RecursivelyPlanNonColocatedJoinWalker(
    Node* joinNode, ColocatedJoinChecker* colocatedJoinChecker,
    RecursivePlanningContext* recursivePlanningContext)
{
    if (joinNode == NULL) {
        return;
    } else if (IsA(joinNode, FromExpr)) {
        FromExpr* fromExpr = (FromExpr*)joinNode;
        ListCell* fromExprCell;

        /*
         * For each element of the from list, check whether the element is
         * colocated with the anchor subquery by recursing until we
         * find the subqueries.
         */
        foreach (fromExprCell, fromExpr->fromlist) {
            Node* fromElement = (Node*)lfirst(fromExprCell);

            RecursivelyPlanNonColocatedJoinWalker(fromElement, colocatedJoinChecker,
                                                  recursivePlanningContext);
        }
    } else if (IsA(joinNode, JoinExpr)) {
        JoinExpr* joinExpr = (JoinExpr*)joinNode;

        /* recurse into the left subtree */
        RecursivelyPlanNonColocatedJoinWalker(joinExpr->larg, colocatedJoinChecker,
                                              recursivePlanningContext);

        /* recurse into the right subtree */
        RecursivelyPlanNonColocatedJoinWalker(joinExpr->rarg, colocatedJoinChecker,
                                              recursivePlanningContext);
    } else if (IsA(joinNode, RangeTblRef)) {
        int rangeTableIndex = ((RangeTblRef*)joinNode)->rtindex;
        List* rangeTableList = colocatedJoinChecker->subquery->rtable;
        RangeTblEntry* rte = rt_fetch(rangeTableIndex, rangeTableList);

        /* we're only interested in subqueries for now */
        if (rte->rtekind != RTE_SUBQUERY) {
            return;
        }

        /*
         * If the subquery is not colocated with the anchor subquery,
         * recursively plan it.
         */
        Query* subquery = rte->subquery;
        if (!SubqueryColocated(subquery, colocatedJoinChecker)) {
            RecursivelyPlanSubquery(subquery, recursivePlanningContext);
        }
    } else {
        pg_unreachable();
    }
}

/*
 * RecursivelyPlanNonColocatedSubqueriesInWhere gets a query and walks over its
 * sublinks to find subqueries that live in WHERE clause.
 *
 * When a subquery found, it's checked whether the subquery is colocated with the
 * anchor subquery specified in the nonColocatedJoinContext. If not,
 * the subquery is recursively planned.
 */
static void RecursivelyPlanNonColocatedSubqueriesInWhere(
    Query* query, ColocatedJoinChecker* colocatedJoinChecker,
    RecursivePlanningContext* recursivePlanningContext)
{
    List* sublinkList = SublinkListFromWhere(query);
    ListCell* sublinkCell = NULL;

    foreach (sublinkCell, sublinkList) {
        SubLink* sublink = (SubLink*)lfirst(sublinkCell);
        Query* subselect = (Query*)sublink->subselect;

        /* subselect is probably never NULL, but anyway lets keep the check */
        if (subselect == NULL) {
            continue;
        }

        if (!SubqueryColocated(subselect, colocatedJoinChecker)) {
            RecursivelyPlanSubquery(subselect, recursivePlanningContext);
        }
    }
}

/*
 * RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and
 * recursively plans all non-recurring (i.e., distributed) rels that that
 * participate in an outer join expression together with a recurring rel,
 * such as <distributed> in "<recurring> LEFT JOIN <distributed>", i.e.,
 * where the recurring rel causes returning recurring tuples from the worker
 * nodes.
 *
 * Returns true if given node is recurring.
 *
 * See RecursivelyPlanDistributedJoinNode() function for the explanation on
 * what does it mean for a node to be "recurring" or "distributed".
 */
static bool RecursivelyPlanRecurringTupleOuterJoinWalker(
    Node* node, Query* query, RecursivePlanningContext* recursivePlanningContext)
{
    if (node == NULL) {
        return false;
    } else if (IsA(node, FromExpr)) {
        FromExpr* fromExpr = (FromExpr*)node;
        ListCell* fromExprCell;

        /* search for join trees in each FROM element */
        foreach (fromExprCell, fromExpr->fromlist) {
            Node* fromElement = (Node*)lfirst(fromExprCell);

            RecursivelyPlanRecurringTupleOuterJoinWalker(fromElement, query,
                                                         recursivePlanningContext);
        }

        /*
         * Can only appear during the top-level call and top-level callers
         * are not interested in the return value. Even more, we can't tell
         * whether a FromExpr is recurring or not.
         */
        return false;
    } else if (IsA(node, JoinExpr)) {
        JoinExpr* joinExpr = (JoinExpr*)node;

        Node* leftNode = joinExpr->larg;
        Node* rightNode = joinExpr->rarg;

        /*
         * There may be recursively plannable outer joins deeper in the join tree.
         *
         * We first handle the sub join trees and then the top level one since the
         * top level join expression might not require recursive planning after
         * handling the sub join trees.
         */
        bool leftNodeRecurs = RecursivelyPlanRecurringTupleOuterJoinWalker(
            leftNode, query, recursivePlanningContext);
        bool rightNodeRecurs = RecursivelyPlanRecurringTupleOuterJoinWalker(
            rightNode, query, recursivePlanningContext);
        switch (joinExpr->jointype) {
            case JOIN_LEFT: {
                /* <recurring> left join <distributed> */
                if (leftNodeRecurs && !rightNodeRecurs) {
                    ereport(DEBUG1, (errmsg("recursively planning right side of "
                                            "the left join since the outer side "
                                            "is a recurring rel")));
                    RecursivelyPlanDistributedJoinNode(rightNode, query,
                                                       recursivePlanningContext);
                }

                /*
                 * A LEFT JOIN is recurring if the lhs is recurring.
                 * Note that we might have converted the rhs into a recurring
                 * one too if the lhs is recurring, but this anyway has no
                 * effects when deciding whether a LEFT JOIN is recurring.
                 */
                return leftNodeRecurs;
            }

            case JOIN_RIGHT: {
                /* <distributed> right join <recurring> */
                if (!leftNodeRecurs && rightNodeRecurs) {
                    ereport(DEBUG1, (errmsg("recursively planning left side of "
                                            "the right join since the outer side "
                                            "is a recurring rel")));
                    RecursivelyPlanDistributedJoinNode(leftNode, query,
                                                       recursivePlanningContext);
                }

                /*
                 * Similar to LEFT JOINs, a RIGHT JOIN is recurring if the rhs
                 * is recurring.
                 */
                return rightNodeRecurs;
            }

            case JOIN_FULL: {
                /*
                 * <recurring> full join <distributed>
                 * <distributed> full join <recurring>
                 */
                if (leftNodeRecurs && !rightNodeRecurs) {
                    ereport(DEBUG1, (errmsg("recursively planning right side of "
                                            "the full join since the other side "
                                            "is a recurring rel")));
                    RecursivelyPlanDistributedJoinNode(rightNode, query,
                                                       recursivePlanningContext);
                } else if (!leftNodeRecurs && rightNodeRecurs) {
                    ereport(DEBUG1, (errmsg("recursively planning left side of "
                                            "the full join since the other side "
                                            "is a recurring rel")));
                    RecursivelyPlanDistributedJoinNode(leftNode, query,
                                                       recursivePlanningContext);
                }

                /*
                 * An OUTER JOIN is recurring if any sides of the join is
                 * recurring. As in other outer join types, it doesn't matter
                 * whether the other side was / became recurring or not.
                 */
                return leftNodeRecurs || rightNodeRecurs;
            }

            case JOIN_INNER: {
                /*
                 * We don't need to recursively plan non-outer joins and we
                 * already descended into sub join trees to handle outer joins
                 * buried in them.
                 */
                return leftNodeRecurs && rightNodeRecurs;
            }

            default: {
                ereport(ERROR, (errmsg("got unexpected join type (%d) when recursively "
                                       "planning a join",
                                       joinExpr->jointype)));
            }
        }
    } else if (IsA(node, RangeTblRef)) {
        return IsRTERefRecurring((RangeTblRef*)node, query);
    } else {
        ereport(ERROR, (errmsg("got unexpected node type (%d) when recursively "
                               "planning a join",
                               nodeTag(node))));
    }

    return false;
}

/*
 * RecursivelyPlanDistributedJoinNode is a helper function for
 * RecursivelyPlanRecurringTupleOuterJoinWalker that recursively plans given
 * distributed node that is known to be inner side of an outer join.
 *
 * Fails to do so if the distributed join node references the recurring one.
 * In that case, we don't throw an error here but instead we let
 * DeferredErrorIfUnsupportedRecurringTuplesJoin to so for a better error
 * message.
 *
 * We call a node "distributed" if it points to a distributed table or a
 * more complex object (i.e., a join tree or a subquery) that can be pushed
 * down to the worker nodes directly. For a join, this means that it's either
 * an INNER join where any side of it is a distributed table / a distributed
 * sub join tree, or an OUTER join where the outer side is a distributed table
 * / a distributed sub join tree.
 */
static void RecursivelyPlanDistributedJoinNode(
    Node* node, Query* query, RecursivePlanningContext* recursivePlanningContext)
{
    if (IsA(node, JoinExpr)) {
        /*
         * This, for example, means that RecursivelyPlanRecurringTupleOuterJoinWalker
         * needs to plan inner side, i.e., "<distributed> INNER JOIN <distributed>",
         * of the following join:
         *   <recurring> LEFT JOIN (<distributed> JOIN <distributed>)
         *
         * XXX: Ideally, we should handle such a sub join tree by moving
         *      it into a subquery "as a whole" but this implies that we need to
         *      rebuild the rtable and re-point all the Vars to the new rtable
         *      indexes, so we've not implemented that yet.
         *
         *      Instead, we recursively plan all the distributed tables in that
         *      sub join tree. This is much more inefficient than the other
         *      approach (since we lose the opportunity to push-down the whole
         *      sub join tree into the workers) but is easier to implement.
         */

        RecursivelyPlanDistributedJoinNode(((JoinExpr*)node)->larg, query,
                                           recursivePlanningContext);

        RecursivelyPlanDistributedJoinNode(((JoinExpr*)node)->rarg, query,
                                           recursivePlanningContext);

        return;
    }

    if (!IsA(node, RangeTblRef)) {
        ereport(ERROR, (errmsg("unexpected join node type (%d)", nodeTag(node))));
    }

    RangeTblRef* rangeTableRef = (RangeTblRef*)node;
    if (IsRTERefRecurring(rangeTableRef, query)) {
        /*
         * Not the top-level callers but RecursivelyPlanDistributedJoinNode
         * might call itself for recurring nodes and need to skip them.
         */
        return;
    }

    RangeTblEntry* distributedRte = rt_fetch(rangeTableRef->rtindex, query->rtable);
    if (distributedRte->rtekind == RTE_RELATION) {
        ereport(DEBUG1, (errmsg("recursively planning distributed relation %s "
                                "since it is part of a distributed join node "
                                "that is outer joined with a recurring rel",
                                GetRelationNameAndAliasName(distributedRte))));

        PlannerRestrictionContext* restrictionContext =
            GetPlannerRestrictionContext(recursivePlanningContext);
        List* requiredAttributes =
            RequiredAttrNumbersForRelation(distributedRte, restrictionContext);

        ReplaceRTERelationWithRteSubquery(distributedRte, requiredAttributes,
                                          recursivePlanningContext, NULL);
    } else if (distributedRte->rtekind == RTE_SUBQUERY) {
        /*
         * We don't try logging the subquery here because RecursivelyPlanSubquery
         * will anyway do so if the query doesn't reference the outer query.
         */
        ereport(DEBUG1, (errmsg("recursively planning the distributed subquery "
                                "since it is part of a distributed join node "
                                "that is outer joined with a recurring rel")));

        bool recursivelyPlanned =
            RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext);
        if (!recursivelyPlanned) {
            /*
             * RecursivelyPlanSubquery fails to plan a subquery only if it
             * contains references to the outer query. This means that, we can't
             * plan such outer joins (like <recurring LEFT OUTER distributed>)
             * if it's a LATERAL join where the distributed side is a subquery that
             * references the outer side, as in,
             *
             * SELECT * FROM reference
             * LEFT JOIN LATERAL
             * (SELECT * FROM distributed WHERE reference.b > distributed.b) q
             * USING (a);
             */
            Assert(ContainsReferencesToOuterQuery(distributedRte->subquery));
        }
    } else {
        /*
         * We don't expect RecursivelyPlanRecurringTupleOuterJoinWalker to try recursively
         * plan such an RTE.
         */
        ereport(ERROR, (errmsg("got unexpected RTE type (%d) when recursively "
                               "planning a join",
                               distributedRte->rtekind)));
    }
}

/*
 * IsRTERefRecurring returns true if given rte reference points to a recurring
 * rte.
 *
 * If an rte points to a table, then we call it recurring if the table is not
 * a distributed table. Otherwise, e.g., if it points a query, then we call it
 * recurring if none of the rtes that belongs to the query point to a distributed
 * table.
 *
 * Note that it's safe to assume a subquery is not recurring if we have a rte reference
 * to a distributed table somewhere in the query tree. For example, considering
 * the subquery (q) of the the following query:
 *   SELECT * FROM ref LEFT JOIN (SELECT * FROM ref LEFT dist) q,
 * one might think that it's not appropriate to call IsRTERefRecurring for subquery
 * (q). However, this is already not the case because this function is called
 * in the context of recursive planning and hence any query that contains
 * rtes pointing to distributed tables and that cannot be pushed down to worker
 * nodes should've been recursively planned already. This is because, the recursive
 * planner processes the queries in bottom-up fashion. For this reason, the subquery
 * in the example should've already be converted to the following before we check
 * the rte reference that points to the subquery (q):
 *   SELECT * FROM ref LEFT JOIN (SELECT * FROM ref LEFT (SELECT * FROM
 * read_intermediate_result()) dist_1) That way, we wouldn't incorrectly say that (SELECT
 * * FROM ref LEFT dist) is a distributed subquery (due to having a reference to a
 * distributed table).
 */
static bool IsRTERefRecurring(RangeTblRef* rangeTableRef, Query* query)
{
    int rangeTableIndex = rangeTableRef->rtindex;
    List* rangeTableList = query->rtable;
    RangeTblEntry* rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList);
    return !FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry),
                                                          IsDistributedTableRTE);
}

/*
 * SublinkListFromWhere finds the subquery nodes in the where clause of the given query.
 * Note that the function should be called on the original query given that postgres
 * standard_planner() may convert the subqueries in WHERE clause to joins.
 */
static List* SublinkListFromWhere(Query* originalQuery)
{
    FromExpr* joinTree = originalQuery->jointree;
    List* sublinkList = NIL;

    if (!joinTree) {
        return NIL;
    }

    Node* queryQuals = joinTree->quals;
    ExtractSublinkWalker(queryQuals, &sublinkList);

    return sublinkList;
}

/*
 * ExtractSublinkWalker walks over a quals node, and finds all sublinks
 * in that node.
 */
static bool ExtractSublinkWalker(Node* node, List** sublinkList)
{
    bool walkerResult = false;
    if (node == NULL) {
        return false;
    }

    if (IsA(node, SubLink)) {
        (*sublinkList) = lappend(*sublinkList, node);
    } else {
        walkerResult =
            expression_tree_walker(node, walker_cast0(ExtractSublinkWalker), sublinkList);
    }

    return walkerResult;
}

/*
 * ShouldRecursivelyPlanSublinks returns true if the query has a recurring
 * FROM clause.
 */
static bool ShouldRecursivelyPlanSublinks(Query* query)
{
    if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
                                                      IsDistributedTableRTE)) {
        /* there is a distributed table in the FROM clause */
        return false;
    }

    return true;
}

/*
 * RecursivelyPlanAllSubqueries descends into an expression tree and recursively
 * plans all subqueries that contain at least one distributed table. The recursive
 * planning starts from the top of the input query.
 */
static bool RecursivelyPlanAllSubqueries(Node* node,
                                         RecursivePlanningContext* planningContext)
{
    if (node == NULL) {
        return false;
    }

    if (IsA(node, Query)) {
        Query* query = (Query*)node;
        if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable,
                                                          IsCitusTableRTE)) {
            RecursivelyPlanSubquery(query, planningContext);
        }

        return false;
    }

    return expression_tree_walker(node, walker_cast0(RecursivelyPlanAllSubqueries),
                                  planningContext);
}

/*
 * RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner
 * The resulting plan is added to planningContext->subPlanList and CTE references
 * are replaced by subqueries that call read_intermediate_result, which reads the
 * intermediate result of the CTE after it is executed.
 *
 * Recursive and modifying CTEs are not yet supported and return an error.
 */
static DeferredErrorMessage* RecursivelyPlanCTEs(
    Query* query, RecursivePlanningContext* planningContext)
{
    ListCell* cteCell = NULL;
    CteReferenceWalkerContext context = {-1, NIL};

    if (query->cteList == NIL) {
        /* no CTEs, nothing to do */
        return NULL;
    }

    if (query->hasRecursive) {
        return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
                             "recursive CTEs are only supported when they "
                             "contain a filter on the distribution column",
                             NULL, NULL);
    }

    /* get all RTE_CTEs that point to CTEs from cteList */
    CteReferenceListWalker((Node*)query, &context);

    foreach (cteCell, query->cteList) {
        CommonTableExpr* cte = (CommonTableExpr*)lfirst(cteCell);
        char* cteName = cte->ctename;
        Query* subquery = (Query*)cte->ctequery;
        uint64 planId = planningContext->planId;
        List* cteTargetList = NIL;
        ListCell* rteCell = NULL;
        int replacedCtesCount = 0;

        if (ContainsReferencesToOuterQuery(subquery)) {
            return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
                                 "CTEs that refer to other subqueries are not "
                                 "supported in multi-shard queries",
                                 NULL, NULL);
        }

        if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT) {
            /*
             * SELECT CTEs that aren't referenced aren't executed in postgres.
             * We don't need to generate a subplan for it and can take the rest
             * of this iteration off.
             */
            continue;
        }

        uint32 subPlanId = list_length(planningContext->subPlanList) + 1;

        if (IsLoggableLevel(DEBUG1)) {
            StringInfo subPlanString = makeStringInfo();
            pg_get_query_def(subquery, subPlanString);
            ereport(DEBUG1,
                    (errmsg("generating subplan " UINT64_FORMAT "_%u for CTE %s: %s",
                            planId, subPlanId, cteName, subPlanString->data)));
        }

        /* build a sub plan for the CTE */
        DistributedSubPlan* subPlan = CreateDistributedSubPlan(subPlanId, subquery);
        planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);

        /* build the result_id parameter for the call to read_intermediate_result */
        char* resultId = GenerateResultId(planId, subPlanId);

        if (subquery->returningList) {
            /* modifying CTE with returning */
            cteTargetList = subquery->returningList;
        } else {
            /* regular SELECT CTE */
            cteTargetList = subquery->targetList;
        }

        /* replace references to the CTE with a subquery that reads results */
        Query* resultQuery =
            BuildSubPlanResultQuery(cteTargetList, cte->aliascolnames, resultId);

        foreach (rteCell, context.cteReferenceList) {
            RangeTblEntry* rangeTableEntry = (RangeTblEntry*)lfirst(rteCell);

            if (rangeTableEntry->rtekind != RTE_CTE) {
                /*
                 * This RTE pointed to a preceding CTE that was already replaced
                 * by a subplan.
                 */
                continue;
            }

            if (strncmp(rangeTableEntry->ctename, cteName, NAMEDATALEN) == 0) {
                /* change the RTE_CTE into an RTE_SUBQUERY */
                rangeTableEntry->rtekind = RTE_SUBQUERY;
                rangeTableEntry->ctename = NULL;
                rangeTableEntry->ctelevelsup = 0;

                if (replacedCtesCount == 0) {
                    /*
                     * Replace the first CTE reference with the result query directly.
                     */
                    rangeTableEntry->subquery = resultQuery;
                } else {
                    /*
                     * Replace subsequent CTE references with a copy of the result
                     * query.
                     */
                    rangeTableEntry->subquery =
                        static_cast<Query*>(copyObject(resultQuery));
                }

                replacedCtesCount++;
            }
        }

        Assert(cte->cterefcount == replacedCtesCount);
    }

    /*
     * All CTEs are now executed through subplans and RTE_CTEs pointing
     * to the CTE list have been replaced with subqueries. We can now
     * clear the cteList.
     */
    query->cteList = NIL;

    return NULL;
}

/*
 * RecursivelyPlanSubqueryWalker recursively finds all the Query nodes and
 * recursively plans if necessary.
 */
static bool RecursivelyPlanSubqueryWalker(Node* node, RecursivePlanningContext* context)
{
    if (node == NULL) {
        return false;
    }

    if (IsA(node, Query)) {
        Query* query = (Query*)node;

        context->level += 1;

        /*
         * First, make sure any subqueries and CTEs within this subquery
         * are recursively planned if necessary.
         */
        DeferredErrorMessage* error = RecursivelyPlanSubqueriesAndCTEs(query, context);
        if (error != NULL) {
            RaiseDeferredError(error, ERROR);
        }
        context->level -= 1;

        /*
         * Recursively plan this subquery if it cannot be pushed down and is
         * eligible for recursive planning.
         */
        if (ShouldRecursivelyPlanSubquery(query, context)) {
            RecursivelyPlanSubquery(query, context);
        }

        /* we're done, no need to recurse anymore for this query */
        return false;
    }

    return expression_tree_walker(node, walker_cast0(RecursivelyPlanSubqueryWalker),
                                  context);
}

/*
 * ShouldRecursivelyPlanSubquery decides whether the input subquery should be recursively
 * planned or not.
 *
 * For the details, see the cases in the function.
 */
static bool ShouldRecursivelyPlanSubquery(Query* subquery,
                                          RecursivePlanningContext* context)
{
    if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable,
                                                      IsLocalTableRteOrMatView)) {
        /*
         * Postgres can always plan queries that don't require distributed planning.
         * Note that we need to check this first, otherwise the calls to the many other
         * Citus planner functions would error our due to local relations.
         *
         * TODO: We could only successfully create distributed plans with local tables
         * when the local tables are on the leaf queries and the upper level queries
         * do not contain any other local tables.
         */
    } else if (CanPushdownSubquery(subquery, false)) {
        /*
         * We should do one more check for the distribution key equality.
         *
         * If the input query to the planner doesn't contain distribution key equality,
         * we should further check whether this individual subquery contains or not.
         *
         * If all relations are not joined on their distribution keys for the given
         * subquery, we cannot push push it down and therefore we should try to
         * recursively plan it.
         */
        if (!context->allDistributionKeysInQueryAreEqual &&
            !AllDistributionKeysInSubqueryAreEqual(subquery,
                                                   context->plannerRestrictionContext)) {
            return true;
        }

        /*
         * Citus can pushdown this subquery, no need to recursively
         * plan which is much more expensive than pushdown.
         */
        return false;
    }

    return true;
}

/*
 * AllDistributionKeysInSubqueryAreEqual is a wrapper function
 * for AllDistributionKeysInQueryAreEqual(). Here, we filter the
 * planner restrictions for the given subquery and do the restriction
 * equality checks on the filtered restriction.
 */
static bool AllDistributionKeysInSubqueryAreEqual(
    Query* subquery, PlannerRestrictionContext* restrictionContext)
{
    /* we don't support distribution eq. checks for CTEs yet */
    if (subquery->cteList != NIL) {
        return false;
    }

    PlannerRestrictionContext* filteredRestrictionContext =
        FilterPlannerRestrictionForQuery(restrictionContext, subquery);

    bool allDistributionKeysInSubqueryAreEqual =
        AllDistributionKeysInQueryAreEqual(subquery, filteredRestrictionContext);
    if (!allDistributionKeysInSubqueryAreEqual) {
        return false;
    }

    return true;
}

/*
 * ShouldRecursivelyPlanSetOperation determines whether the leaf queries of a
 * set operations tree need to be recursively planned in order to support the
 * query as a whole.
 */
static bool ShouldRecursivelyPlanSetOperation(Query* query,
                                              RecursivePlanningContext* context)
{
    SetOperationStmt* setOperations = (SetOperationStmt*)query->setOperations;
    if (setOperations == NULL) {
        return false;
    }

    if (context->level == 0) {
        /*
         * We cannot push down top-level set operation. Recursively plan the
         * leaf nodes such that it becomes a router query.
         */
        return true;
    }

    if (setOperations->op != SETOP_UNION) {
        /*
         * We can only push down UNION operaionts, plan other set operations
         * recursively.
         */
        return true;
    }

    if (DeferErrorIfUnsupportedUnionQuery(query) != NULL) {
        /*
         * If at least one leaf query in the union is recurring, then all
         * leaf nodes need to be recurring.
         */
        return true;
    }

    PlannerRestrictionContext* filteredRestrictionContext =
        FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query);
    if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) {
        /*
         * The distribution column is not in the same place in all sides
         * of the union, meaning we cannot determine distribution column
         * equivalence. Recursive planning is necessary.
         */
        return true;
    }

    return false;
}

/*
 * RecursivelyPlanSetOperations descends into a tree of set operations
 * (e.g. UNION, INTERSECTS) and recursively plans all leaf nodes that
 * contain distributed tables.
 */
static void RecursivelyPlanSetOperations(Query* query, Node* node,
                                         RecursivePlanningContext* context)
{
    if (IsA(node, SetOperationStmt)) {
        SetOperationStmt* setOperations = (SetOperationStmt*)node;

        RecursivelyPlanSetOperations(query, setOperations->larg, context);
        RecursivelyPlanSetOperations(query, setOperations->rarg, context);
    } else if (IsA(node, RangeTblRef)) {
        RangeTblRef* rangeTableRef = (RangeTblRef*)node;
        RangeTblEntry* rangeTableEntry = rt_fetch(rangeTableRef->rtindex, query->rtable);
        Query* subquery = rangeTableEntry->subquery;

        if (rangeTableEntry->rtekind == RTE_SUBQUERY &&
            FindNodeMatchingCheckFunction((Node*)subquery, IsDistributedTableRTE)) {
            RecursivelyPlanSubquery(subquery, context);
        }
    } else {
        ereport(ERROR, (errmsg("unexpected node type (%d) while "
                               "expecting set operations or "
                               "range table references",
                               nodeTag(node))));
    }
}

/*
 * IsLocalTableRteOrMatView gets a node and returns true if the node is a range
 * table entry that points to a postgres local or citus local table or to a
 * materialized view.
 */
static bool IsLocalTableRteOrMatView(Node* node)
{
    if (node == NULL) {
        return false;
    }

    if (!IsA(node, RangeTblEntry)) {
        return false;
    }

    RangeTblEntry* rangeTableEntry = (RangeTblEntry*)node;
    if (rangeTableEntry->rtekind != RTE_RELATION) {
        return false;
    }

    if (rangeTableEntry->relkind == RELKIND_VIEW) {
        return false;
    }

    Oid relationId = rangeTableEntry->relid;
    return IsRelationLocalTableOrMatView(relationId);
}

/*
 * IsRelationLocalTableOrMatView returns true if the given relation
 * is a citus local, local, or materialized view.
 */
bool IsRelationLocalTableOrMatView(Oid relationId)
{
    if (!IsCitusTable(relationId)) {
        /* postgres local table or a materialized view */
        return true;
    } else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) {
        return true;
    }

    /* no local table found */
    return false;
}

/*
 * RecursivelyPlanSubquery recursively plans a query, replaces it with a
 * result query and returns the subplan.
 *
 * Before we recursively plan the given subquery, we should ensure
 * that the subquery doesn't contain any references to the outer
 * queries (i.e., such queries cannot be separately planned). In
 * that case, the function doesn't recursively plan the input query
 * and immediately returns. Later, the planner decides on what to do
 * with the query.
 */
static bool RecursivelyPlanSubquery(Query* subquery,
                                    RecursivePlanningContext* planningContext)
{
    uint64 planId = planningContext->planId;
    Query* debugQuery = NULL;

    if (ContainsReferencesToOuterQuery(subquery)) {
        elog(DEBUG2, "skipping recursive planning for the subquery since it "
                     "contains references to outer queries");

        return false;
    }

    /*
     * Subquery will go through the standard planner, thus to properly deparse it
     * we keep its copy: debugQuery.
     */
    if (IsLoggableLevel(DEBUG1)) {
        debugQuery = static_cast<Query*>(copyObject(subquery));
    }

    /*
     * Create the subplan and append it to the list in the planning context.
     */
    int subPlanId = list_length(planningContext->subPlanList) + 1;

    DistributedSubPlan* subPlan = CreateDistributedSubPlan(subPlanId, subquery);
    planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan);

    /* build the result_id parameter for the call to read_intermediate_result */
    char* resultId = GenerateResultId(planId, subPlanId);

    /*
     * BuildSubPlanResultQuery() can optionally use provided column aliases.
     * We do not need to send additional alias list for subqueries.
     */
    Query* resultQuery = BuildSubPlanResultQuery(subquery->targetList, NIL, resultId);

    if (IsLoggableLevel(DEBUG1)) {
        StringInfo subqueryString = makeStringInfo();

        pg_get_query_def(debugQuery, subqueryString);

        ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT "_%u for subquery %s",
                                planId, subPlanId, subqueryString->data)));
    }

    /* finally update the input subquery to point the result query */
    *subquery = *resultQuery;
    return true;
}

/*
 * CreateDistributedSubPlan creates a distributed subplan by recursively calling
 * the planner from the top, which may either generate a local plan or another
 * distributed plan, which can itself contain subplans.
 */
static DistributedSubPlan* CreateDistributedSubPlan(uint32 subPlanId, Query* subPlanQuery)
{
    int cursorOptions = 0;

    if (ContainsReadIntermediateResultFunction((Node*)subPlanQuery)) {
        /*
         * Make sure we go through distributed planning if there are
         * read_intermediate_result calls, even if there are no distributed
         * tables in the query anymore.
         *
         * We cannot perform this check in the planner itself, since that
         * would also cause the workers to attempt distributed planning.
         */
        cursorOptions |= CURSOR_OPT_FORCE_DISTRIBUTED;
    }

    DistributedSubPlan* subPlan = CitusMakeNode(DistributedSubPlan);
    subPlan->plan = planner(subPlanQuery, cursorOptions, NULL);
    subPlan->subPlanId = subPlanId;

    return subPlan;
}

/*
 * CteReferenceListWalker finds all references to CTEs in the top level of a query
 * and adds them to context->cteReferenceList.
 */
static bool CteReferenceListWalker(Node* node, CteReferenceWalkerContext* context)
{
    if (node == NULL) {
        return false;
    }

    if (IsA(node, RangeTblEntry)) {
        RangeTblEntry* rangeTableEntry = (RangeTblEntry*)node;

        if (rangeTableEntry->rtekind == RTE_CTE &&
            rangeTableEntry->ctelevelsup == context->level) {
            context->cteReferenceList =
                lappend(context->cteReferenceList, rangeTableEntry);
        }

        /* caller will descend into range table entry */
        return false;
    } else if (IsA(node, Query)) {
        Query* query = (Query*)node;

        context->level += 1;
        query_tree_walker(query, walker_cast0(CteReferenceListWalker), context,
                          QTW_EXAMINE_RTES);
        context->level -= 1;

        return false;
    } else {
        return expression_tree_walker(node, walker_cast0(CteReferenceListWalker),
                                      context);
    }
}

/*
 * ContainsReferencesToOuterQuery determines whether the given query contains
 * anything that points outside of the query itself. Such queries cannot be
 * planned recursively.
 */
bool ContainsReferencesToOuterQuery(Query* query)
{
    VarLevelsUpWalkerContext context = {0};
    int flags = 0;

    return query_tree_walker(query, walker_cast0(ContainsReferencesToOuterQueryWalker),
                             &context, flags);
}

/*
 * ContainsReferencesToOuterQueryWalker determines whether the given query
 * contains any Vars that point more than context->level levels up.
 *
 * ContainsReferencesToOuterQueryWalker recursively descends into subqueries
 * and increases the level by 1 before recursing.
 */
static bool ContainsReferencesToOuterQueryWalker(Node* node,
                                                 VarLevelsUpWalkerContext* context)
{
    if (node == NULL) {
        return false;
    }

    if (IsA(node, Var)) {
        if (((Var*)node)->varlevelsup > context->level) {
            return true;
        }

        return false;
    } else if (IsA(node, Aggref)) {
        if (((Aggref*)node)->agglevelsup > context->level) {
            return true;
        }
    } else if (IsA(node, GroupingFunc)) {
        if (((GroupingFunc*)node)->agglevelsup > context->level) {
            return true;
        }

        return false;
    } else if (IsA(node, PlaceHolderVar)) {
        if (((PlaceHolderVar*)node)->phlevelsup > context->level) {
            return true;
        }
    } else if (IsA(node, Query)) {
        Query* query = (Query*)node;
        int flags = 0;

        context->level += 1;
        bool found = query_tree_walker(
            query, walker_cast0(ContainsReferencesToOuterQueryWalker), context, flags);
        context->level -= 1;

        return found;
    }

    return expression_tree_walker(
        node, walker_cast0(ContainsReferencesToOuterQueryWalker), context);
}

/*
 * NodeContainsSubqueryReferencingOuterQuery determines whether the given node
 * contains anything that points outside of the query itself.
 */
static bool NodeContainsSubqueryReferencingOuterQuery(Node* node)
{
    List* sublinks = NIL;
    ExtractSublinkWalker(node, &sublinks);

    SubLink* sublink;
    foreach_declared_ptr(sublink, sublinks)
    {
        if (ContainsReferencesToOuterQuery(castNode(Query, sublink->subselect))) {
            return true;
        }
    }

    return false;
}

/*
 * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry
 * with a subquery. The function also pushes down the filters to the subquery.
 *
 * It then recursively plans the subquery. This subquery is wrapped with another subquery
 * as a trick to reduce network cost, because we currently don't have an easy way to
 * skip generating NULL's for non-required columns, and if we create (SELECT a, NULL, NULL
 * FROM table) then this will be sent over network and NULL's also occupy some space.
 * Instead of this we generate: (SELECT t.a, NULL, NULL FROM (SELECT a FROM table) t). The
 * inner subquery will be recursively planned but the outer part will not be yet it will
 * still have the NULL columns so that the query is correct.
 */
void ReplaceRTERelationWithRteSubquery(RangeTblEntry* rangeTableEntry,
                                       List* requiredAttrNumbers,
                                       RecursivePlanningContext* context,
                                       RTEPermissionInfo* perminfo)
{
    Query* subquery =
        WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers, perminfo);
    List* outerQueryTargetList =
        CreateAllTargetListForRelation(rangeTableEntry->relid, requiredAttrNumbers);

    List* restrictionList = GetRestrictInfoListForRelation(
        rangeTableEntry, context->plannerRestrictionContext);
    List* copyRestrictionList = static_cast<List*>(copyObject(restrictionList));
    Expr* andedBoundExpressions = make_ands_explicit(copyRestrictionList);
    subquery->jointree->quals = (Node*)andedBoundExpressions;

    /*
     * Originally the quals were pointing to the RTE and its varno
     * was pointing to its index in rtable. However now we converted the RTE
     * to a subquery and the quals should be pointing to that subquery, which
     * is the only RTE in its rtable, hence we update the varnos so that they
     * point to the subquery RTE.
     * Originally: rtable: [rte1, current_rte, rte3...]
     * Now: rtable: [rte1, subquery[current_rte], rte3...] --subquery[current_rte] refers
     * to its rtable.
     */
    Node* quals = subquery->jointree->quals;
    UpdateVarNosInNode(quals, SINGLE_RTE_INDEX);

    /* replace the function with the constructed subquery */
    rangeTableEntry->rtekind = RTE_SUBQUERY;
    rangeTableEntry->subquery = subquery;

    /*
     * If the relation is inherited, it'll still be inherited as
     * we've copied it earlier. This is to prevent the newly created
     * subquery being treated as inherited.
     */
    rangeTableEntry->inh = false;

    if (IsLoggableLevel(DEBUG1)) {
        char* relationAndAliasName = GetRelationNameAndAliasName(rangeTableEntry);
        ereport(DEBUG1,
                (errmsg("Wrapping relation %s to a subquery", relationAndAliasName)));
    }

    /* as we created the subquery, now forcefully recursively plan it */
    bool recursivelyPlanned = RecursivelyPlanSubquery(subquery, context);
    if (!recursivelyPlanned) {
        ereport(ERROR,
                (errmsg("unexpected state: query should have been recursively planned")));
    }

    Query* outerSubquery = CreateOuterSubquery(rangeTableEntry, outerQueryTargetList);
    rangeTableEntry->subquery = outerSubquery;
}

/*
 * GetRelationNameAndAliasName returns the relname + alias name if
 * alias name exists otherwise only the relname is returned.
 */
static char* GetRelationNameAndAliasName(RangeTblEntry* rangeTableEntry)
{
    StringInfo str = makeStringInfo();
    appendStringInfo(str, "\"%s\"", get_rel_name(rangeTableEntry->relid));

    char* aliasName = NULL;
    if (rangeTableEntry->alias) {
        aliasName = rangeTableEntry->alias->aliasname;
    }

    if (aliasName) {
        appendStringInfo(str, " \"%s\"", aliasName);
    }
    return str->data;
}

/*
 * CreateOuterSubquery creates outer subquery which contains
 * the given range table entry in its rtable.
 */
static Query* CreateOuterSubquery(RangeTblEntry* rangeTableEntry,
                                  List* outerSubqueryTargetList)
{
    List* innerSubqueryColNames =
        GenerateRequiredColNamesFromTargetList(outerSubqueryTargetList);

    Query* outerSubquery = makeNode(Query);
    outerSubquery->commandType = CMD_SELECT;

    /* we copy the input rteRelation to preserve the rteIdentity */
    RangeTblEntry* innerSubqueryRTE =
        static_cast<RangeTblEntry*>(copyObject(rangeTableEntry));

    innerSubqueryRTE->eref->colnames = innerSubqueryColNames;
    outerSubquery->rtable = list_make1(innerSubqueryRTE);
    /* set the FROM expression to the subquery */
    RangeTblRef* newRangeTableRef = makeNode(RangeTblRef);
    newRangeTableRef->rtindex = 1;
    outerSubquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);

    outerSubquery->targetList = outerSubqueryTargetList;
    return outerSubquery;
}

/*
 * GenerateRequiredColNamesFromTargetList generates the required colnames
 * from the given target list.
 */
static List* GenerateRequiredColNamesFromTargetList(List* targetList)
{
    TargetEntry* entry = NULL;
    List* innerSubqueryColNames = NIL;
    foreach_declared_ptr(entry, targetList)
    {
        if (IsA(entry->expr, Var)) {
            /*
             * column names of the inner subquery should only contain the
             * required columns, as in if we choose 'b' from ('a','b') colnames
             * should be 'a' not ('a','b')
             */
            innerSubqueryColNames =
                lappend(innerSubqueryColNames, makeString(entry->resname));
        }
    }
    return innerSubqueryColNames;
}

/*
 * UpdateVarNosInNode iterates the Vars in the
 * given node and updates the varno's as the newVarNo.
 */
void UpdateVarNosInNode(Node* node, Index newVarNo)
{
    List* varList =
        pull_var_clause(node, PVC_RECURSE_AGGREGATES, PVC_RECURSE_PLACEHOLDERS);
    Var* var = NULL;
    foreach_declared_ptr(var, varList)
    {
        var->varno = newVarNo;
    }
}

/*
 * IsRecursivelyPlannableRelation returns true if the given range table entry
 * is a relation type that can be converted to a subquery.
 */
bool IsRecursivelyPlannableRelation(RangeTblEntry* rangeTableEntry)
{
    if (rangeTableEntry->rtekind != RTE_RELATION) {
        return false;
    }
    return rangeTableEntry->relkind == PARTTYPE_PARTITIONED_RELATION ||
           rangeTableEntry->relkind == RELKIND_RELATION ||
           rangeTableEntry->relkind == RELKIND_MATVIEW ||
           rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE;
}

/*
 * ContainsLocalTableDistributedTableJoin returns true if the input range table list
 * contains a direct join between local RTE and an RTE that contains a distributed
 * or reference table.
 */
bool ContainsLocalTableDistributedTableJoin(List* rangeTableList)
{
    bool containsLocalTable = false;
    bool containsDistributedTable = false;

    RangeTblEntry* rangeTableEntry = NULL;
    foreach_declared_ptr(rangeTableEntry, rangeTableList)
    {
        if (FindNodeMatchingCheckFunctionInRangeTableList(
                list_make1(rangeTableEntry), IsDistributedOrReferenceTableRTE)) {
            containsDistributedTable = true;
        } else if (IsRecursivelyPlannableRelation(rangeTableEntry) &&
                   IsLocalTableRteOrMatView((Node*)rangeTableEntry)) {
            /* we consider citus local tables as local table */
            containsLocalTable = true;
        }
    }

    return containsLocalTable && containsDistributedTable;
}

/*
 * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries
 * of a query and wraps the functions inside (SELECT * FROM fnc() f)
 * subqueries, so that those functions will be executed on the coordinator if
 * necessary.
 *
 * We wrap all the functions that are used in joins except the ones that are
 * laterally joined or have WITH ORDINALITY clauses.
 * */
static void WrapFunctionsInSubqueries(Query* query)
{
    List* rangeTableList = query->rtable;
    ListCell* rangeTableCell = NULL;

    /*
     * If we have only one function call in a query without any joins, we can
     * easily decide where to execute it.
     *
     * If there are some subqueries and/or functions that are joined with a
     * function, it is not trivial to decide whether we should run this
     * function in the coordinator or in workers and therefore we may need to
     * wrap some of those functions in subqueries.
     *
     * If we have only one RTE, we leave the parsed query tree as it is. This
     * also makes sure we do not wrap an already wrapped function call
     * because we know that there will always be 1 RTE in a wrapped function.
     * */
    if (list_length(rangeTableList) < 2) {
        return;
    }

    /* iterate over all RTEs and wrap them if necessary */
    foreach (rangeTableCell, rangeTableList) {
        RangeTblEntry* rangeTableEntry = (RangeTblEntry*)lfirst(rangeTableCell);

        if (ShouldTransformRTE(rangeTableEntry)) {
            TransformFunctionRTE(rangeTableEntry);
        }
    }
}

/*
 * TransformFunctionRTE wraps a given function RangeTableEntry
 * inside a (SELECT * from function() f) subquery.
 *
 * The said RangeTableEntry is modified and now points to the new subquery.
 * */
static void TransformFunctionRTE(RangeTblEntry* rangeTblEntry)
{
    Query* subquery = makeNode(Query);
    RangeTblRef* newRangeTableRef = makeNode(RangeTblRef);
    Var* targetColumn = NULL;
    TargetEntry* targetEntry = NULL;
    AttrNumber targetColumnIndex = 0;
    subquery->commandType = CMD_SELECT;

    /* copy the input rangeTblEntry to prevent cycles */
    RangeTblEntry* newRangeTableEntry =
        static_cast<RangeTblEntry*>(copyObject(rangeTblEntry));

    /* set the FROM expression to the subquery */
    subquery->rtable = list_make1(newRangeTableEntry);

    newRangeTableRef->rtindex = 1;
    subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL);

    /* Determine the result type of the function.
     *
     * If function return type is not composite or rowtype can't be determined,
     * tupleDesc is set to null here
     */
    TupleDesc tupleDesc =
        (TupleDesc)get_expr_result_tupdesc(rangeTblEntry->funcexpr, true);

    /*
     * If tupleDesc is not null, we iterate over all the attributes and
     * create targetEntries
     * */
    if (tupleDesc) {
        /*
         * A sample function join that end up here:
         *
         * CREATE FUNCTION f(..) RETURNS TABLE(c1 int, c2 text) AS .. ;
         * SELECT .. FROM table JOIN f(..) ON ( .. ) ;
         *
         * We will iterate over Tuple Description attributes. i.e (c1 int, c2 text)
         */
        if (tupleDesc->natts > MaxAttrNumber) {
            ereport(ERROR, (errmsg("bad number of tuple descriptor attributes")));
        }
        AttrNumber natts = tupleDesc->natts;
        for (targetColumnIndex = 0; targetColumnIndex < natts; targetColumnIndex++) {
            FormData_pg_attribute* attribute =
                TupleDescAttr(tupleDesc, targetColumnIndex);
            Oid columnType = attribute->atttypid;
            char* columnName = attribute->attname.data;

            /*
             * The indexing of attributes and TupleDesc and varattno differ
             *
             * varattno=0 corresponds to whole row
             * varattno=1 corresponds to first column that is stored in tupDesc->attrs[0]
             *
             * That's why we need to add one to the targetColumnIndex
             * */
            targetColumn =
                makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid, 0);
            targetEntry = makeTargetEntry((Expr*)targetColumn, targetColumnIndex + 1,
                                          columnName, false);
            subquery->targetList = lappend(subquery->targetList, targetEntry);
        }
    }

    /*
     * If tupleDesc is NULL we have 2 different cases:
     *
     * 1. The function returns a record but the attributes can not be
     * determined just by looking at the function definition. In this case the
     * column names and types must be defined explicitly in the query
     *
     * 2. The function returns a non-composite type (e.g. int, text, jsonb ..)
     * */
    else {
        /* create target entries for all columns returned by the function */
        ListCell* functionColumnName = NULL;

        List* functionColumnNames = rangeTblEntry->eref->colnames;
        foreach (functionColumnName, functionColumnNames) {
            char* columnName = strVal(lfirst(functionColumnName));
            Oid columnType = InvalidOid;

            /*
             * If the function returns a set of records, the query needs
             * to explicitly name column names and types
             *
             * Use explicitly defined types in the query if they are
             * available
             * */
            if (list_length(rangeTblEntry->funccoltypes) > 0) {
                /*
                 * A sample function join that end up here:
                 *
                 * CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS
                 * $cmd$
                 * SELECT x, x+1 FROM generate_series(0,4) f(x)
                 * $cmd$
                 * LANGUAGE SQL;
                 *
                 * SELECT *
                 * FROM table1 JOIN get_set_of_records() AS t2(x int, y int)
                 * ON (id = x);
                 *
                 * Note that the function definition does not have column
                 * names and types. Therefore the user needs to explicitly
                 * state them in the query
                 * */
                columnType = list_nth_oid(rangeTblEntry->funccoltypes, targetColumnIndex);
            }

            /* use the types in the function definition otherwise */
            else {
                /*
                 * Only functions returning simple types end up here.
                 * A sample function:
                 *
                 * CREATE FUNCTION add(integer, integer) RETURNS integer AS
                 * 'SELECT $1 + $2;'
                 * LANGUAGE SQL;
                 * SELECT * FROM table JOIN add(3,5) sum ON ( .. ) ;
                 * */
                FuncExpr* funcExpr = (FuncExpr*)rangeTblEntry->funcexpr;
                columnType = funcExpr->funcresulttype;
            }

            /* Note that the column k is associated with varattno/resno of k+1 */
            targetColumn =
                makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid, 0);
            targetEntry = makeTargetEntry((Expr*)targetColumn, targetColumnIndex + 1,
                                          columnName, false);
            subquery->targetList = lappend(subquery->targetList, targetEntry);

            targetColumnIndex++;
        }
    }

    /* replace the function with the constructed subquery */
    rangeTblEntry->rtekind = RTE_SUBQUERY;
    rangeTblEntry->subquery = subquery;
}

/*
 * ShouldTransformRTE determines whether a given RTE should bne wrapped in a
 * subquery.
 *
 * Not all functions should be wrapped in a subquery for now. As we support more
 * functions to be used in joins, the constraints here will be relaxed.
 * */
static bool ShouldTransformRTE(RangeTblEntry* rangeTableEntry)
{
    /*
     * We should wrap only function rtes that are not LATERAL and
     * without WITH ORDINALITY clause
     */
    if (rangeTableEntry->rtekind != RTE_FUNCTION || rangeTableEntry->lateral) {
        return false;
    }
    return true;
}

/*
 * BuildSubPlanResultQuery returns a query of the form:
 *
 * SELECT
 *   <target list>
 * FROM
 *   read_intermediate_result('<resultId>', '<copy format'>)
 *   AS res (<column definition list>);
 *
 * The caller can optionally supply a columnAliasList, which is useful for
 * CTEs that have column aliases.
 *
 * If any of the types in the target list cannot be used in the binary copy format,
 * then the copy format 'text' is used, otherwise 'binary' is used.
 */
Query* BuildSubPlanResultQuery(List* targetEntryList, List* columnAliasList,
                               char* resultId)
{
    Oid functionOid = CitusReadIntermediateResultFuncId();
    bool useBinaryCopyFormat = CanUseBinaryCopyFormatForTargetList(targetEntryList);

    Const* resultIdConst = makeNode(Const);
    resultIdConst->consttype = TEXTOID;
    resultIdConst->consttypmod = -1;
    resultIdConst->constlen = -1;
    resultIdConst->constvalue = CStringGetTextDatum(resultId);
    resultIdConst->constbyval = false;
    resultIdConst->constisnull = false;
    resultIdConst->location = -1;

    return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList,
                                             resultIdConst, functionOid,
                                             useBinaryCopyFormat);
}

/*
 * BuildReadIntermediateResultsArrayQuery returns a query of the form:
 *
 * SELECT
 *   <target list>
 * FROM
 *   read_intermediate_results(ARRAY['<resultId>', ...]::text[], '<copy format'>)
 *   AS res (<column definition list>);
 *
 * The caller can optionally supply a columnAliasList, which is useful for
 * CTEs that have column aliases.
 *
 * If useBinaryCopyFormat is true, then 'binary' format is used. Otherwise,
 * 'text' format is used.
 */
Query* BuildReadIntermediateResultsArrayQuery(List* targetEntryList,
                                              List* columnAliasList, List* resultIdList,
                                              bool useBinaryCopyFormat)
{
    Oid functionOid = CitusReadIntermediateResultArrayFuncId();

    Const* resultIdConst = makeNode(Const);
    resultIdConst->consttype = TEXTARRAYOID;
    resultIdConst->consttypmod = -1;
    resultIdConst->constlen = -1;
    resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(resultIdList));
    resultIdConst->constbyval = false;
    resultIdConst->constisnull = false;
    resultIdConst->location = -1;

    return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList,
                                             resultIdConst, functionOid,
                                             useBinaryCopyFormat);
}

/*
 * BuildReadIntermediateResultsQuery is the common code for generating
 * queries to read from result files. It is used by
 * BuildReadIntermediateResultsArrayQuery and BuildSubPlanResultQuery.
 */
static Query* BuildReadIntermediateResultsQuery(List* targetEntryList,
                                                List* columnAliasList,
                                                Const* resultIdConst, Oid functionOid,
                                                bool useBinaryCopyFormat)
{
    List* funcColNames = NIL;
    List* funcColTypes = NIL;
    List* funcColTypMods = NIL;
    List* funcColCollations = NIL;
    ListCell* targetEntryCell = NULL;
    List* targetList = NIL;
    int columnNumber = 1;
    Oid copyFormatId = BinaryCopyFormatId();
    int columnAliasCount = list_length(columnAliasList);

    /* build the target list and column definition list */
    foreach (targetEntryCell, targetEntryList) {
        TargetEntry* targetEntry = (TargetEntry*)lfirst(targetEntryCell);
        Node* targetExpr = (Node*)targetEntry->expr;
        char* columnName = targetEntry->resname;
        Oid columnType = exprType(targetExpr);
        Oid columnTypMod = exprTypmod(targetExpr);
        Oid columnCollation = exprCollation(targetExpr);

        if (targetEntry->resjunk) {
            continue;
        }

        funcColNames = lappend(funcColNames, makeString(columnName));
        funcColTypes = lappend_int(funcColTypes, columnType);
        funcColTypMods = lappend_int(funcColTypMods, columnTypMod);
        funcColCollations = lappend_int(funcColCollations, columnCollation);

        Var* functionColumnVar = makeNode(Var);
        functionColumnVar->varno = 1;
        functionColumnVar->varattno = columnNumber;
        functionColumnVar->vartype = columnType;
        functionColumnVar->vartypmod = columnTypMod;
        functionColumnVar->varcollid = columnCollation;
        functionColumnVar->varlevelsup = 0;
        functionColumnVar->location = -1;

        TargetEntry* newTargetEntry = makeNode(TargetEntry);
        newTargetEntry->expr = (Expr*)functionColumnVar;
        newTargetEntry->resno = columnNumber;

        /*
         * Rename the column only if a column alias is defined.
         * Notice that column alias count could be less than actual
         * column count. We only use provided aliases and keep the
         * original column names if no alias is defined.
         */
        if (columnAliasCount >= columnNumber) {
            auto columnAlias = (Value*)list_nth(columnAliasList, columnNumber - 1);
            Assert(IsA(columnAlias, String));
            newTargetEntry->resname = strVal(columnAlias);
        } else {
            newTargetEntry->resname = columnName;
        }
        newTargetEntry->resjunk = false;

        targetList = lappend(targetList, newTargetEntry);

        columnNumber++;
    }

    /* build the citus_copy_format parameter for the call to read_intermediate_result */
    if (!useBinaryCopyFormat) {
        copyFormatId = TextCopyFormatId();
    }

    Const* resultFormatConst = makeNode(Const);
    resultFormatConst->consttype = CitusCopyFormatTypeId();
    resultFormatConst->consttypmod = -1;
    resultFormatConst->constlen = 4;
    resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId);
    resultFormatConst->constbyval = true;
    resultFormatConst->constisnull = false;
    resultFormatConst->location = -1;

    /* build the call to read_intermediate_result */
    FuncExpr* funcExpr = makeNode(FuncExpr);
    funcExpr->funcid = functionOid;
    funcExpr->funcresulttype = RECORDOID;
    funcExpr->funcretset = true;
    funcExpr->funcvariadic = false;
    funcExpr->funcformat = COERCE_EXPLICIT_CALL;
    funcExpr->funccollid = 0;
    funcExpr->inputcollid = 0;
    funcExpr->location = -1;
    funcExpr->args = list_make2(resultIdConst, resultFormatConst);

    Alias* funcAlias = makeNode(Alias);
    funcAlias->aliasname = "intermediate_result";
    funcAlias->colnames = funcColNames;

    RangeTblEntry* rangeTableEntry = makeNode(RangeTblEntry);
    rangeTableEntry->rtekind = RTE_FUNCTION;
    rangeTableEntry->funcexpr = (Node*)funcExpr;
    rangeTableEntry->inFromCl = true;
    rangeTableEntry->eref = funcAlias;

    rangeTableEntry->funccoltypes = funcColTypes;
    rangeTableEntry->funccoltypmods = funcColTypMods;
    rangeTableEntry->funccolcollations = funcColCollations;

    /* build the join tree using the read_intermediate_result RTE */
    RangeTblRef* rangeTableRef = makeNode(RangeTblRef);
    rangeTableRef->rtindex = 1;

    FromExpr* joinTree = makeNode(FromExpr);
    joinTree->fromlist = list_make1(rangeTableRef);

    /* build the SELECT query */
    Query* resultQuery = makeNode(Query);
    resultQuery->commandType = CMD_SELECT;
    resultQuery->rtable = list_make1(rangeTableEntry);
    resultQuery->jointree = joinTree;
    resultQuery->targetList = targetList;

    return resultQuery;
}

/*
 * GenerateResultId generates the result ID that is used to identify an intermediate
 * result of the subplan with the given plan ID and subplan ID.
 */
char* GenerateResultId(uint64 planId, uint32 subPlanId)
{
    StringInfo resultId = makeStringInfo();

    appendStringInfo(resultId, UINT64_FORMAT "_%u", planId, subPlanId);

    return resultId->data;
}

/*
 * GeneratingSubplans returns true if we are currently in the process of
 * generating subplans.
 */
bool GeneratingSubplans(void)
{
    return Session_ctx::PlanCtx().recursivePlanningDepth > 0;
}
